package com.atguigu.flink.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Created by Smexy on 2023/11/7

 使用DataSet API进行批处理。
 ---------------------------
 套路：
    所有的计算框架，不管是mr,spark都是一个套路
    1）创建环境
               mr          spark             flink DataSet API
            Driver      SparkContext            ExecutionEnvironment
    2) 读取数据，封装为数据模型
              mr         spark       flink DataSet API
             K-V          RDD             DataSource
    3) 处理数据模型
            mr                  spark                 flink
        Mapper.map()          RDD.转换算子()       DataSource.算子(业务功能)
        Reducer.reduce()      RDD.行动算子()
 */
public class Demo1_BatchDemo
{
    public static void main(String[] args) throws Exception {
        //1.创建环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        //2.读取数据，封装为数据模型
        DataSource<String> dataSource = environment.readTextFile("E:\\sz230619\\data\\words.txt");
        //3.读取文件中每一行的内容，切分为N个(单词,1)
        dataSource.
            /*
                Tuple: flink中提供的集合类型，可以存N个字段。
                    Tuple2： 是一个可以存2个字段的集合
                    Tuple3： 是一个可以存3个字段的集合
                    ...
                    Tuple25: 是一个可以存25个字段的集合
             */
            flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
            {
                /*
                    String value: 输入的一行内容。从上游发送到这里。
                    Collector<Tuple2<String, Integer>> out： 输出结果的收集器。
                            只需要把输出的结果收到到out中，它帮你把数据发送到下一个处理算子中
                 */
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = value.split(" ");
                    Arrays.stream(words)
                              .forEach(
                                  word -> out.collect(Tuple2.of(word,1))
                              );

                }
            })
            //由于数据是tuple所以，需要使用groupBy(int... fields)
            .groupBy(0)
            //对Tuple2的第二列进行累积
            .sum(1)
            .print();

    }
}
